#include "config.h"

#include <sys/statvfs.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <errno.h>
#include <sys/statfs.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "configure.h"
#include "cluster.h"
#include "squeue.h"
#include "cache.h"
#include "net_global.h"
#include "volume_bh.h"
#include "job_dock.h"
#include "pool_ctl.h"
#include "stor_ctl.h"
#include "core.h"
#include "volume_ctl.h"
#include "../replica/replica.h"
#include "../storage/stor_rpc.h"
#include "dbg.h"

//#define BH_QUEUE_THREAD 10

typedef struct {
        sy_spinlock_t lock;
        worker_handler_t sem;
        squeue_t queue;
} queue_t;

typedef struct {
        queue_t queue[0];
} bh_queue_t;

typedef struct {
        chkid_t id;
        int op;
        char name[MAX_NAME_LEN];
} entry_t;

static bh_queue_t *bh_queue;

static uint32_t __volume_bh_hash(const void *i)
{
        const entry_t *key = i;
        return key->id.id + hash_str(key->name);
}

static int __volume_bh_equal(const void *key, const void *data)
{
        const entry_t *cmp = key;
        const squeue_entry_t *sent = data;
        entry_t *ent;

        ent = sent->ent;

        return !chkid_cmp(&cmp->id, &ent->id) && !strcmp(cmp->name, ent->name);
}

/**
 * @todo 离线节点如何处理？
 *
 * @param ent
 * @return
 */
static int __volume_bh_filecleanup(entry_t *ent)
{
        int ret, retry = 0;
        nid_t *nid;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        DINFO("cleanup %s @ "CHKID_FORMAT" begin\n", ent->name, CHKID_ARG(&ent->id));

#if 0
        DERROR("volume cleanup disabled\n");
        return 0;
#endif
        
        // 查找对应卷控制器
        chkinfo = (void *)_chkinfo;
        ret = pool_ctl_lookup(&ent->id, ent->name, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        nid = &chkinfo->diskid[0].id;
        if (net_islocal(nid)) {
                ret = stor_ctl_cleanup(&chkinfo->id, ent->name);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                DWARN("cleanup %s @ "CHKID_FORMAT" not found\n",
                                      ent->name, CHKID_ARG(&ent->id));
                        } else if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry, retry, 10, (1000 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_cleanup(nid, &chkinfo->id, ent->name);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                DWARN("cleanup %s @ "CHKID_FORMAT" not found\n",
                                      ent->name, CHKID_ARG(&ent->id));
                        } else if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry, retry, 10, (1000 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        }

        // 从/system/unlink里清除
        // TODO need core_request
        ret = pool_ctl_cleanup0(&ent->id, ent->name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("cleanup %s @ "CHKID_FORMAT" finished\n", ent->name, CHKID_ARG(&ent->id));

        return 0;
err_ret:
        if (ret == EAGAIN) {
                DINFO("cleanup %s @ "CHKID_FORMAT" wait, ret %d\n", ent->name, CHKID_ARG(&ent->id), ret);
        } else {
                DERROR("cleanup %s @ "CHKID_FORMAT" fail, ret %d\n", ent->name, CHKID_ARG(&ent->id), ret);
        }

        return ret;
}

static int __volume_bh_snapcleanup(entry_t *ent)
{
        int ret, retry = 0;

        DINFO("cleanup %s @ "CHKID_FORMAT"\n",
              ent->name, CHKID_ARG(&ent->id));

        ANALYSIS_BEGIN(0);
retry:
        ret = volume_ctl_snapshot_cleanup_bh(&ent->id, ent->name);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        DWARN("cleanup %s @ "CHKID_FORMAT" retry %u\n",
                                        ent->name, CHKID_ARG(&ent->id), retry);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (1000 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ANALYSIS_END(0, 1000, NULL);

        DINFO("cleanup %s @ "CHKID_FORMAT" success\n",
              ent->name, CHKID_ARG(&ent->id));

        return 0;
err_ret:
        DWARN("cleanup %s @ "CHKID_FORMAT" fail\n",
              ent->name, CHKID_ARG(&ent->id));
        return ret;
}

static int __volume_bh_snaprollback(entry_t *ent)
{
        int ret, retry = 0;

        DINFO("rollback "CHKID_FORMAT"\n", CHKID_ARG(&ent->id));

retry:
        ret = volume_ctl_snapshot_rollback_bh(&ent->id);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        DWARN("rollback %s @ "CHKID_FORMAT" retry %u\n",
                                        ent->name, CHKID_ARG(&ent->id), retry);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (1000 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __volume_bh_flat(entry_t *ent)
{
        int ret, retry = 0;

        DINFO("flat "CHKID_FORMAT"\n", CHKID_ARG(&ent->id));

retry:
        ret = volume_ctl_snapshot_flat_bh(&ent->id);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        DWARN("flat %s @ "CHKID_FORMAT" retry %u\n",
                                        ent->name, CHKID_ARG(&ent->id), retry);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (1000 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        entry_t *ent;
        int retval;
} core_ctx_t;

static void __volume_bh_exec_remote(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;
        entry_t *ent = core_ctx->ent;

        if (ent->op & BH_SNAP_ROLLBACK) {
                ret = __volume_bh_snaprollback(ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (ent->op & BH_SNAP_FLAT) {
                ret = __volume_bh_flat(ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

static int __volume_bh_exec_local(entry_t *ent)
{
        int ret;

        if (ent->op & BH_FILE_CLEANUP) {
                ret = __volume_bh_filecleanup(ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else  if (ent->op & BH_SNAP_CLEANUP) {
                ret = __volume_bh_snapcleanup(ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        return 0;
err_ret:
        return ret;
}

static int __volume_bh_exec(entry_t *ent)
{
        int ret;
        core_ctx_t core_ctx;

        DINFO("chkid %s name %s op %d\n", id2str(&ent->id), ent->name, ent->op);

        if (ent->op & BH_FILE_CLEANUP || ent->op & BH_SNAP_CLEANUP) {
                // 不能直接调用协程代码，需要core_request
                ret = __volume_bh_exec_local(ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                core_ctx.ent = ent;
                core_ctx.retval = 0;

                // 运行在独立thread里，所以不会导致schedule_yield timeout。
                ret = core_request0(core_hash(&ent->id), __volume_bh_exec_remote,
                                    &core_ctx, "volume_bh");
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret = core_ctx.retval;
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __volume_bh_worker(void *arg)
{
        int ret;
        entry_t *ent;
        queue_t *queue;
        chkid_t chkid;

        queue = arg;

        while (1) {
                ret = sy_spin_lock(&queue->lock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = squeue_getfirst(&queue->queue, (void **)&ent);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                sy_spin_unlock(&queue->lock);
                                ent = NULL;
                                break;
                        } else
                                UNIMPLEMENTED(__DUMP__);
                }

                sy_spin_unlock(&queue->lock);

                ret = __volume_bh_exec(ent);
                if (unlikely(ret)) {
                        DWARN("volume "CHKID_FORMAT"/%s op %d failed, ret %d\n",
                              CHKID_ARG(&ent->id), ent->name, ent->op, ret);
                } else {
                        DWARN("volume "CHKID_FORMAT"/%s op %d success\n",
                              CHKID_ARG(&ent->id), ent->name, ent->op);
                }

                ret = sy_spin_lock(&queue->lock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                chkid = ent->id;
                ret = squeue_remove(&queue->queue, ent, (void **)&ent);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                YASSERT(chkid_cmp(&chkid, &ent->id) == 0);

                sy_spin_unlock(&queue->lock);

                yfree((void **)&ent);
        }

        return 0;
err_ret:
        return ret;
}

static int __volume_bh_hash__(const char *name)
{
        chkid_t chkid;

        str2chkid(&chkid, name);
        if (!chkid_isnull(&chkid)) 
                return (chkid.id % gloconf.cleanup_thread);
        else
                return 0;
}

/**
 * 往任务队列插入新任务
 *
 * @param chkid
 * @param name
 * @param op
 * @return
 */
static int __volume_bh(const chkid_t *chkid, const char *name, int op)
{
        int ret;
        entry_t *ent, *tmp;
        queue_t *queue;

        DBUG("file "CHKID_FORMAT"/%s, op %u\n", CHKID_ARG(chkid), name, op);

        if (name)
                queue = &bh_queue->queue[__volume_bh_hash__(name)];
        else
                queue = &bh_queue->queue[chkid->id % gloconf.cleanup_thread];

        ret = ymalloc((void**)&ent, sizeof(*ent));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->id = *chkid;
        if (name)
                strcpy(ent->name, name);
        else
                ent->name[0] = '\0';

        ret = sy_spin_lock(&queue->lock);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = squeue_get(&queue->queue, ent, (void **)&tmp);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DINFO("file "CHKID_FORMAT"/%s, op %u\n", CHKID_ARG(chkid), name, op);
                        ent->op = op;
                        ret = squeue_insert(&queue->queue, ent, ent, 0);
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        worker_post(&queue->sem);
                } else
                        GOTO(err_lock, ret);
        } else {
                DWARN("file "CHKID_FORMAT"/%s already in queue\n", CHKID_ARG(chkid), name);
                yfree((void **)&ent);
                worker_post(&queue->sem);
        }

        sy_spin_unlock(&queue->lock);

        return 0;
err_lock:
        sy_spin_unlock(&queue->lock);
err_free:
        yfree((void **)&ent);
err_ret:
        return ret;
}

int volume_bh_filecleanup(const chkid_t *chkid, const char *name)
{
        int ret;
        chkid_t parent;

        ret = replica_srv_getparent(chkid, &parent, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        GOTO(err_ret, ret);
                } else
                        UNIMPLEMENTED(__DUMP__);
        }

        ret = __volume_bh(chkid, name, BH_FILE_CLEANUP);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int volume_bh_snapcleanup(const chkid_t *chkid, const char *name)
{
        int ret;
        chkid_t parent;

        ret = replica_srv_getparent(chkid, &parent, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        GOTO(err_ret, ret);
                } else
                        UNIMPLEMENTED(__DUMP__);
        }

        ret = volume_ctl_snapshot_check(chkid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_bh(chkid, name, BH_SNAP_CLEANUP);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/**
 * @brief 提交后台任务
 *
 * @todo 如何检查controller变化，并返回给应用层?
 *
 * @note 不能用到volume ctl的机制，会造成deadlock
 *
 * @param chkid
 * @param op BH_SNAP_ROLLBACK | BH_SNAP_FLAT
 * @return
 */
int volume_bh_add(const chkid_t *chkid, int op, int check_exists)
{
        int ret;

        YASSERT(op == BH_SNAP_ROLLBACK || op == BH_SNAP_FLAT);

#if 0
        chkid_t parent;
        ret = replica_srv_getparent(chkid, &parent, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        GOTO(err_ret, ret);
                } else
                        UNIMPLEMENTED(__DUMP__);
        }
#else
        if (check_exists) {
                int exists = 0;
                ret = volume_ctl_check_exists(chkid, &exists);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (!exists) {
                        ret = EREMCHG;
                        GOTO(err_ret, ret);
                }
        }
#endif

        ret = __volume_bh(chkid, NULL, op);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/**
 * @brief 后台任务队列
 *
 * @return
 */
int volume_bh_init()
{
        int ret, i;
        queue_t *queue;

        ret = ymalloc((void**)&bh_queue, sizeof(*bh_queue)
                      + sizeof(queue_t) * gloconf.cleanup_thread);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < gloconf.cleanup_thread; i++) {
                queue = &bh_queue->queue[i];

                ret = sy_spin_init(&queue->lock);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                ret = squeue_init(&queue->queue, 1024, __volume_bh_equal, __volume_bh_hash);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                ret = worker_create(&queue->sem, "volume_bh", __volume_bh_worker,
                                    NULL, queue, WORKER_TYPE_SEM, 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        }

        DINFO("file cleanup service start\n");

        return 0;
err_free:
        yfree((void **)&bh_queue);
err_ret:
        return ret;
}
